package org.voovan.korla.rpc.service;

import org.voovan.Global;
import org.voovan.korla.KorlaStatic;
import org.voovan.korla.message.Callback;
import org.voovan.korla.rpc.RpcMethod;
import org.voovan.korla.rpc.RpcScaner;
import org.voovan.korla.rpc.RpcStatic;
import org.voovan.korla.rpc.message.RpcCall;
import org.voovan.korla.rpc.message.RpcNull;
import org.voovan.korla.rpc.message.RpcReturn;
import org.voovan.korla.socket.KorlaProvider;
import org.voovan.tools.json.JSON;
import org.voovan.tools.log.Logger;
import org.voovan.tools.reflect.TReflect;

import java.io.IOException;

import static org.voovan.korla.rpc.RpcStatic.registerClass;

/**
 * Rpc服务提供者
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class RpcProvider {
    static {
        registerClass();
    }

    private transient KorlaProvider korlaProvider;
    private transient RpcRetry rpcRetry;

    /**
     * 构造方法
     * @param korlaProvider  Korla 服务提供器
     */
    public RpcProvider(KorlaProvider korlaProvider) {
        this(korlaProvider, null);
    }

    /**
     * 构造方法
     * @param korlaProvider  Korla 服务提供器
     * @param rpcRetry Rpc 重试判断器
     */
    public RpcProvider(KorlaProvider korlaProvider, RpcRetry rpcRetry) {

        this.korlaProvider = korlaProvider;
        this.rpcRetry = rpcRetry;

        //扫描可以提供 RPC 服务的类
        RpcScaner.scan(RpcStatic.RPC_SCAN_PACKAGE);
    }

    /**
     * 获取 Rpc 重试判断器
      * @return Rpc 重试判断器
     */
    public RpcRetry getRpcRetry() {
        return rpcRetry;
    }

    /**
     * 设置 Rpc 重试判断器
     * @param rpcRetry Rpc 重试判断器
     */
    public void setRpcRetry(RpcRetry rpcRetry) {
        this.rpcRetry = rpcRetry;
    }

    /**
     * 获取 Korla 服务提供器
     * @return  Korla 服务提供器
     */
    public KorlaProvider getKorlaProvider() {
        return korlaProvider;
    }

    /**
     * 启用 Rpc 服务
     * @throws IOException IO异 常
     */
    public void serve() throws IOException {
        korlaProvider.addCallback(new Callback<RpcCall, RpcReturn>() {
            @Override
            public RpcReturn apply(RpcCall rpcCall) {

                RpcReturn rpcReturn = rpcCall.buildAnswer(RpcReturn.class);
                RpcMethod rpcMethod = RpcStatic.RPC_METHODS.get(rpcCall.getRpcMethodName());
                while(true) {
                    try {
                        Object result = rpcMethod.invoke(rpcCall.getParamArray());
                        rpcReturn.setReturnType(RpcReturn.RPC_RETURN_VALUE);
                        rpcReturn.setReturnValue(result);

                        //传递幂等性
                        rpcReturn.setIdempotence(KorlaStatic.DEFAULT_IDEMPOTENCE ? rpcMethod.isIdempotence() : false);
                    } catch (Exception e) {
                        //================重试检查, 通过 rpcRetry 来判断是否需要进行重试================
                        {
                            if (rpcCall.getRetryCount()< 100 &&
                                    rpcRetry != null &&
                                    rpcRetry.canRetry(rpcCall.incrementRetryCount(), e)) {

                                continue;
                            }
                        }

                        if(e.getCause()!=null) {
                            e = (Exception) e.getCause();
                        }

                        //================异常的业务处理逻辑================
                        {
                            String diagnostic = "";
                            if (e instanceof java.lang.IllegalArgumentException) {
                                diagnostic = " \r\nmethod except types: " + diagnostic + JSON.toJSON(rpcMethod.getParamTypes());
                                diagnostic = diagnostic + "\r\nparams actual types: " + JSON.toJSON(TReflect.getArrayClasses(rpcCall.getParamArray())) + "\r\n";
                            }

                            diagnostic = JSON.toJSON(rpcCall.getParamArray());

                            Logger.error("Rpc invoke failed, [name=" + rpcCall.getRpcMethodName() + ", id=" + rpcCall.getId() + "]" + diagnostic, e);

                            rpcReturn.setReturnType(RpcReturn.RPC_RETURN_EXCEPTION);
                            rpcReturn.setException(rpcCall.getRpcMethodName(), e.getMessage());

                            //传递幂等性, 异常不需要幂等
                            rpcReturn.setIdempotence(false);
                        }
                    }

                    rpcReturn.setForceFlush(rpcMethod.isForceFlush());
                    return rpcReturn;
                }
            }
        });

        korlaProvider.listen();
    }

    /**
     * Rpc 服务提供器
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @return RpcProvider 对象
     */
    public static RpcProvider newInstance(String host, int port, int readTimeout, int sendTimeout){
        try {
            KorlaProvider korlaProvider = new KorlaProvider(host, port, readTimeout, sendTimeout);
            return new RpcProvider(korlaProvider);
        } catch (Exception e) {
            Logger.error("Create RpcProvider failed", e);
        }

        return null;
    }

    /**
     * Rpc 服务提供器
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @return RpcProvider 对象
     */
    public static RpcProvider newInstance(String host, int port, int readTimeout, int sendTimeout, RpcRetry rpcRetry){
        try {
            KorlaProvider korlaProvider = new KorlaProvider(host, port, readTimeout, sendTimeout);

            if(korlaProvider.isListening()) {
                return new RpcProvider(korlaProvider, rpcRetry);
            }
        } catch (Exception e) {
            Logger.error("Create RpcProvider failed", e);
        }

        return null;
    }
}
